---
title: "Performance Optimization"
description: "Optimize mcp_use performance for production workloads"
icon: "zap"
---

This guide covers performance optimization techniques for mcp_use deployments, from basic tuning to advanced scaling strategies.

## Server Management Optimization

### Enable Server Manager

The most impactful performance optimization is enabling the server manager:

```python
# ❌ Poor performance - all servers start immediately
agent = MCPAgent(llm=llm, client=client, use_server_manager=False)

# ✅ Better performance - servers start only when needed
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
```

**Benefits**:
- **Lazy loading**: Servers start only when their tools are needed
- **Resource efficiency**: Lower memory and CPU usage
- **Faster startup**: Agent initialization completes quickly

### Limit Concurrent Servers

Control resource usage by limiting concurrent server connections:

```python
agent = MCPAgent(
    llm=llm,
    client=client,
    use_server_manager=True,
    max_concurrent_servers=3,  # Limit to 3 active servers
    server_startup_timeout=30  # Faster timeout for stuck servers
)
```

## Tool Optimization

### Restrict Tool Access

Reduce decision complexity by limiting available tools:

```python
# Method 1: Whitelist specific tools
agent = MCPAgent(
    llm=llm,
    client=client,
    allowed_tools=["file_read", "file_write", "web_search"],
    use_server_manager=True
)

# Method 2: Blacklist problematic tools
agent = MCPAgent(
    llm=llm,
    client=client,
    disallowed_tools=["system_execute", "dangerous_operation"],
    use_server_manager=True
)

# Method 3: Server-level filtering
agent = MCPAgent(
    llm=llm,
    client=client,
    allowed_servers=["filesystem", "playwright"],  # Only these servers
    use_server_manager=True
)
```

### Tool Caching

Implement tool result caching for expensive operations:

```python
from functools import lru_cache
import hashlib

class CachedMCPAgent(MCPAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._tool_cache = {}

    @lru_cache(maxsize=100)
    def _cache_key(self, tool_name: str, inputs: str) -> str:
        """Generate cache key for tool execution"""
        content = f"{tool_name}:{inputs}"
        return hashlib.md5(content.encode()).hexdigest()

    async def _execute_tool_cached(self, tool_name: str, inputs: dict):
        """Execute tool with caching"""
        cache_key = self._cache_key(tool_name, str(sorted(inputs.items())))

        if cache_key in self._tool_cache:
            return self._tool_cache[cache_key]

        result = await super()._execute_tool(tool_name, inputs)
        self._tool_cache[cache_key] = result
        return result

# Usage
agent = CachedMCPAgent(llm=llm, client=client, use_server_manager=True)
```

## LLM Optimization

### Choose Faster Models

Balance capability with speed:

<Tabs>
  <Tab title="OpenAI">
    ```python
    # Fastest
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)

    # Balanced
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)

    # Most capable (slower)
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    ```
  </Tab>
  <Tab title="Anthropic">
    ```python
    # Fastest
    llm = ChatAnthropic(model="claude-3-haiku-20240307", temperature=0.1)

    # Balanced
    llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=0.1)

    # Most capable (slower)
    llm = ChatAnthropic(model="claude-3-opus-20240229", temperature=0.1)
    ```
  </Tab>
  <Tab title="Groq">
    ```python
    # Very fast inference
    llm = ChatGroq(
        model="llama3-8b-8192",  # Smaller, faster
        temperature=0.1,
        max_tokens=1000  # Limit response length
    )

    # Balanced
    llm = ChatGroq(model="llama3-70b-8192", temperature=0.1)
    ```
  </Tab>
</Tabs>

### Optimize LLM Parameters

```python
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.1,  # Lower temperature for more focused responses
    max_tokens=500,   # Limit response length
    streaming=True,   # Enable streaming for perceived speed
    request_timeout=30,  # Reasonable timeout
    max_retries=2     # Limit retry attempts
)
```

### Connection Pooling

For high-throughput scenarios, implement connection pooling:

```python
import asyncio
from langchain_openai import ChatOpenAI

class PooledLLM:
    def __init__(self, model="gpt-4o-mini", pool_size=5):
        self.pool = asyncio.Queue(maxsize=pool_size)
        for _ in range(pool_size):
            llm = ChatOpenAI(model=model, temperature=0.1)
            self.pool.put_nowait(llm)

    async def get_llm(self):
        return await self.pool.get()

    async def return_llm(self, llm):
        await self.pool.put(llm)

# Usage
llm_pool = PooledLLM(pool_size=3)

async def create_agent():
    llm = await llm_pool.get_llm()
    try:
        agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
        return agent
    finally:
        await llm_pool.return_llm(llm)
```

## Configuration Optimization

### Server Configuration Tuning

Optimize individual server configurations:

```json
{
  "mcpServers": {
    "playwright": {
      "command": "npx",
      "args": [
        "@playwright/mcp@latest",
        "--headless=true",
        "--timeout=10000"
      ],
      "env": {
        "PLAYWRIGHT_BROWSERS_PATH": "/opt/playwright",
        "PLAYWRIGHT_SKIP_BROWSER_DOWNLOAD": "true"
      }
    },
    "filesystem": {
      "command": "mcp-server-filesystem",
      "args": [
        "/workspace",
        "--readonly=false",
        "--max-file-size=10MB"
      ]
    }
  }
}
```

### Environment Variables

Set performance-related environment variables:

```bash
# Node.js optimization
export NODE_ENV=production
export NODE_OPTIONS="--max-old-space-size=2048"

# Python optimization
export PYTHONOPTIMIZE=2
export PYTHONDONTWRITEBYTECODE=1

# MCP-specific
export MCP_TIMEOUT=30
export MCP_MAX_RETRIES=2
```

## Memory Management

### Monitor Memory Usage

Track memory consumption:

```python
import psutil
import asyncio

class MemoryMonitor:
    def __init__(self, threshold_mb=500):
        self.threshold_mb = threshold_mb
        self.initial_memory = psutil.Process().memory_info().rss / 1024 / 1024

    def check_memory(self):
        current_memory = psutil.Process().memory_info().rss / 1024 / 1024
        memory_used = current_memory - self.initial_memory

        if memory_used > self.threshold_mb:
            print(f"Warning: Memory usage {memory_used:.1f}MB exceeds threshold")

        return memory_used

# Usage
monitor = MemoryMonitor(threshold_mb=300)

async def run_agent_with_monitoring():
    agent = MCPAgent(llm=llm, client=client, use_server_manager=True)

    result = await agent.run("Your query here")
    memory_used = monitor.check_memory()

    return result, memory_used
```

### Garbage Collection

Force garbage collection for long-running processes:

```python
import gc
import asyncio

async def run_multiple_queries(queries):
    agent = MCPAgent(llm=llm, client=client, use_server_manager=True)

    results = []
    for i, query in enumerate(queries):
        result = await agent.run(query)
        results.append(result)

        # Garbage collect every 10 queries
        if i % 10 == 0:
            gc.collect()

    return results
```

## Async Optimization

### Concurrent Processing

Process multiple queries concurrently:

```python
async def process_queries_concurrently(queries, max_concurrent=3):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_single_query(query):
        async with semaphore:
            agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
            return await agent.run(query)

    tasks = [process_single_query(query) for query in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

# Usage
queries = ["Query 1", "Query 2", "Query 3", "Query 4", "Query 5"]
results = await process_queries_concurrently(queries, max_concurrent=2)
```

### Connection Reuse

Reuse MCP client connections:

```python
class AgentPool:
    def __init__(self, pool_size=3):
        self.pool_size = pool_size
        self.clients = []
        self.available_clients = asyncio.Queue()

    async def initialize(self):
        for _ in range(self.pool_size):
            client = MCPClient.from_config_file("config.json")
            await client.create_all_sessions()  # Pre-create sessions
            self.clients.append(client)
            await self.available_clients.put(client)

    async def get_agent(self):
        client = await self.available_clients.get()
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
        return MCPAgent(llm=llm, client=client, use_server_manager=True)

    async def return_client(self, agent):
        await self.available_clients.put(agent.client)

# Usage
pool = AgentPool(pool_size=3)
await pool.initialize()

agent = await pool.get_agent()
result = await agent.run("Your query")
await pool.return_client(agent)
```

## Production Deployment

### Docker Optimization

Optimize Docker configuration:

```dockerfile
FROM python:3.9-slim

# Install Node.js for MCP servers
RUN apt-get update && apt-get install -y \
    nodejs npm \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Pre-install common MCP servers
RUN npm install -g @playwright/mcp playwright

# Copy application
COPY . .

# Set performance environment variables
ENV NODE_ENV=production
ENV PYTHONOPTIMIZE=2
ENV PYTHONDONTWRITEBYTECODE=1

# Run with limited resources
CMD ["python", "-O", "main.py"]
```

### Kubernetes Scaling

Configure horizontal pod autoscaling:

```yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-use-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-use-app
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
```

## Monitoring and Profiling

### Performance Metrics

Track key performance indicators:

```python
import time
from dataclasses import dataclass
from typing import List

@dataclass
class PerformanceMetrics:
    query_time: float
    server_startup_time: float
    tool_execution_time: float
    memory_usage_mb: float
    tools_used: List[str]

class PerformanceTracker:
    def __init__(self):
        self.metrics = []

    async def track_agent_run(self, agent, query):
        start_time = time.time()
        initial_memory = psutil.Process().memory_info().rss / 1024 / 1024

        # Track server startup time
        server_start = time.time()
        if hasattr(agent, 'client'):
            await agent.client.ensure_connected()
        server_startup_time = time.time() - server_start

        # Run query
        result = await agent.run(query)

        # Calculate metrics
        total_time = time.time() - start_time
        final_memory = psutil.Process().memory_info().rss / 1024 / 1024

        metrics = PerformanceMetrics(
            query_time=total_time,
            server_startup_time=server_startup_time,
            tool_execution_time=total_time - server_startup_time,
            memory_usage_mb=final_memory - initial_memory,
            tools_used=getattr(agent, '_tools_used', [])
        )

        self.metrics.append(metrics)
        return result, metrics

    def get_average_metrics(self):
        if not self.metrics:
            return None

        return {
            'avg_query_time': sum(m.query_time for m in self.metrics) / len(self.metrics),
            'avg_memory_usage': sum(m.memory_usage_mb for m in self.metrics) / len(self.metrics),
            'avg_tools_per_query': sum(len(m.tools_used) for m in self.metrics) / len(self.metrics)
        }

# Usage
tracker = PerformanceTracker()
result, metrics = await tracker.track_agent_run(agent, "Your query")
print(f"Query completed in {metrics.query_time:.2f}s using {metrics.memory_usage_mb:.1f}MB")
```

## Troubleshooting Performance Issues

### Common Performance Problems

<AccordionGroup>
  <Accordion title="Slow agent startup">
    **Causes**: All servers starting simultaneously, large server dependencies

    **Solutions**:
    - Enable server manager: `use_server_manager=True`
    - Pre-install server dependencies
    - Use lighter server alternatives
  </Accordion>

  <Accordion title="High memory usage">
    **Causes**: Multiple large servers, memory leaks, large tool outputs

    **Solutions**:
    - Limit concurrent servers: `max_concurrent_servers=3`
    - Implement garbage collection
    - Restrict tool output size
  </Accordion>

  <Accordion title="Tool execution timeouts">
    **Causes**: Slow servers, network issues, large operations

    **Solutions**:
    - Increase timeouts: `timeout=60`
    - Optimize server configurations
    - Break large operations into smaller chunks
  </Accordion>
</AccordionGroup>

## Next Steps

<CardGroup cols={3}>
  <Card title="Common Issues" icon="circle-help" href="/troubleshooting/common-issues">
    Troubleshoot specific problems and error messages
  </Card>
  <Card title="Security Guide" icon="shield" href="/advanced/security">
    Implement secure, production-ready configurations
  </Card>
  <Card title="Multi-Server Setup" icon="server" href="/advanced/multi-server-setup">
    Optimize complex multi-server deployments
  </Card>
</CardGroup>

<Tip>
Start with enabling the server manager and restricting tools - these two changes alone can improve performance by 50-80% in most cases.
</Tip>
